posix条件变量与互斥锁 示例生产者消费者问题

posix条件变量

一种线程间同步的情形:线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。

pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());

一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调用pthread_cond_wait在一个Condition Variable上阻塞等待,这个函数做以下三步操作:

1. 解锁;如果没有解锁,`其他线程,没法进入临界区`,修改条件,条件将始终没法成立。
2. 等待通知;
3. 得到通知返回前重新加锁; 重新加锁,是`下面的解锁操作对应起来`

3个操作是原子性的操作,之所以一开始要释放Mutex,是因为需要让其他线程进入临界区去更改条件,或者也有其他线程需要进入临界区等待条件。

一个线程可以调用pthread_cond_signal唤醒在某个Condition Variable上等待的第一个线程,也可以调用pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。
Reactor

条件变量使用规范

等待条件代码

pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);

给条件发送通知代码

pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
pthread_cond_signal(cond); 向第一个等待线程,发起通知

我们在使用pthread条件变量wait函数的时候总是使用while去做判断而不是使用if,因为等待在条件变量上的线程被唤醒有可能不是因为条件满足而是由于虚假唤醒(Spurious wakeups)。虚假唤醒在POSIX标准里是默认允许的,所以wait返回只是代表共享数据有可能被改变,因此必须要重新判断。

那么什么时候会出现虚假唤醒呢?在多核处理器下,pthread_cond_signal可能会激活多于一个线程(阻塞在条件变量上的线程)。结果是,当一个线程调用pthread_cond_signal()后,多个调用pthread_cond_wait()或pthread_cond_timedwait()的线程返回。

生产者消费者问题

#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 2
#define PRODUCERS_COUNT 1

pthread_mutex_t g_mutex;
pthread_cond_t g_cond;

pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];

int nready = 0;

void *consume(void *arg)
{
    int num = (int)arg;
    while (1)
    {
        pthread_mutex_lock(&g_mutex);
        while (nready == 0)
        {
            printf("%d begin wait a condtion ...\n", num);
            pthread_cond_wait(&g_cond, &g_mutex);
        }

        printf("%d end wait a condtion ...\n", num);
        printf("%d begin consume product ...\n", num);
        --nready;
        printf("%d end consume product ...\n", num);
        pthread_mutex_unlock(&g_mutex);
        sleep(1);
    }
    return NULL;
}

void *produce(void *arg)
{
    int num = (int)arg;
    while (1)
    {
        pthread_mutex_lock(&g_mutex);
        printf("%d begin produce product ...\n", num);
        ++nready;
        printf("%d end produce product ...\n", num);
        pthread_cond_signal(&g_cond);
        printf("%d signal ...\n", num);
        pthread_mutex_unlock(&g_mutex);
        sleep(1);
    }
    return NULL;
}

int main(void)
{
    int i;

    pthread_mutex_init(&g_mutex, NULL);
    pthread_cond_init(&g_cond, NULL);


    for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)i);

    sleep(1);

    for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i);

    for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    pthread_mutex_destroy(&g_mutex);
    pthread_cond_destroy(&g_cond);

    return 0;
}

在上面程序中,++nready就当作是生产产品了,—nready就当作是消费产品了,这里没有缓冲区大小的概念,可以当作是无界缓冲区,生产者可以一直生产,但消费者只有当有产品的时候才能消费,否则就得等待,等待结束的条件就是nready > 0。